Skip to content

feat: separate out write path executor with unbounded memory limit #26455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2025

Conversation

praveen-influx
Copy link
Contributor

@praveen-influx praveen-influx commented May 23, 2025

Currently when there is an OOM while snapshotting, the process keeps going without crashing. This behaviour is observed in main (commit: be25c6f). This means the wal files keep increasing to a point that restarts never can replay all the files.

This is happening because of the distribution of memory, in enterprise especially there is no need for an ingester to be allocated just 20% for datafusion memory pool (which runs the snapshot) as parquet cache is not in use at all. This 20% is too conservative for an ingester, so instead of redistributing the memory settings based on the mode it's running, a separate write path executor is introduced in this commit with no bound on memory (still uses GreedyMemoryPool under the hood with usize::MAX as upper limit). This means write path executor will always run into OOM and stop the whole process.

Also, it is important to let snapshotting process use as much memory as it needs as without that, the buffer will keep getting bigger and run into OOM anyway.

closes: #26422

Test

  • It wasn't easy to trigger the error condition through e2e tests, having max memory set very low at process level in TestServer does not yield reproducible runs. Perhaps a separate suite that runs longer should be considered.
  • Instead of e2e tests, error is manually reproduced with system settings,
    • 1G max mem,
    • 500M for forcing snapshot (50% as default)
    • 200M for DF executor (20% as default)
  • All the load has been generated using ./target/quick-release/influxdb3_load_generator write-fixed --tput 5.42 -w 40 (note: this write-fixed is a sub-command in load generator in enterprise - sitting in this PR at the moment, once it's merged I'll port it back to core)

Main branch

  • main branch runs into problem 5.42MB/s (snapshot runs into error OOM, but process keeps going receiving writes and writing them to WAL)
  • uses config - 1G max memory, 500M for forcing snapshot, 200M for DF executor
  • actual command to run,
systemd-run --scope  -p MemoryMax=1000M -p CPUQuota=25% ./target/quick-release/influxdb3-main serve --node-id node-1 --object-store file --data-dir /home/praveen/projects/influx/test-data/core-perf  --disable-telemetry-upload --snapshotted-wal-files-to-keep 10 --force-snapshot-mem-threshold 500 --exec-mem-pool-bytes 200 --log-filter 'info,iox_query=debug,influxdb3_server::query_executor=warn,influxdb3_server::http=warn,influxdb3_wal=debug,influxdb3_write::write_buffer::queryable_buffer=debug,influxdb3_write::write_buffer::table_buffer=debug,influxdb3::write_buffer=debug,influxdb3_enterprise=debug' --gen1-duration 10m --without-auth
  • logs to show memory pool exhaustion and continuing to receive the writes (not dropping snapshot permit here means no snapshot runs after this error)
2025-05-29T10:19:36.355457Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=107 num_snapshots_after=900
2025-05-29T10:19:36.355466Z  INFO influxdb3_wal::snapshot_tracker: snapshotting all force_snapshot=true wal_periods_3_times_snapshot_size=false
2025-05-29T10:19:36.355477Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513976355451817 max_timestamp_ns=1748513976355451818 wal_file_number=107
2025-05-29T10:19:36.355575Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 106
2025-05-29T10:19:36.372688Z  INFO influxdb3_wal::object_store: snapshotting wal snapshot_details=SnapshotDetails { snapshot_sequence_number: SnapshotSequenceNumber(1), end_time_marker: 1748514000000000000, first_wal_sequence_number: WalFileSequenceNumber(1), last_wal_sequence_number: WalFileSequenceNumber(107), forced: true }
2025-05-29T10:19:36.372703Z  INFO influxdb3_write::write_buffer::queryable_buffer: Buffering contents and persisting snapshotted data snapshot_details=SnapshotDetails { snapshot_sequence_number: SnapshotSequenceNumber(1), end_time_marker: 1748514000000000000, first_wal_sequence_number: WalFileSequenceNumber(1), last_wal_sequence_number: WalFileSequenceNumber(107), forced: true }
2025-05-29T10:19:36.373338Z  INFO influxdb3_write::write_buffer::queryable_buffer: persisting 2 chunks for wal number 107
2025-05-29T10:19:36.373355Z  INFO influxdb3_write::write_buffer::queryable_buffer: Persisting 4240000 rows for db id 1 and table id 1 and chunk 1748513400000000000 to file node-1/dbs/foo-1/mem-1/2025-05-29/10-10/0000000107.parquet
2025-05-29T10:19:36.373687Z DEBUG iox_query::frontend::reorg: created compact plan for table table_name="mem" table_id=0 plan=Sort: mem.host ASC NULLS FIRST, mem.region ASC NULLS FIRST, mem.city ASC NULLS FIRST, mem.planet ASC NULLS FIRST, mem.time ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: mem [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:19:36.373731Z DEBUG iox_query::exec::context: create_physical_plan: initial plan text=Sort: mem.host ASC NULLS FIRST, mem.region ASC NULLS FIRST, mem.city ASC NULLS FIRST, mem.planet ASC NULLS FIRST, mem.time ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: mem [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:19:36.375121Z DEBUG iox_query::exec::context: create_physical_plan: plan to run text=ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, free@5 as free, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC, region@1 ASC, city@2 ASC, planet@3 ASC, time@6 ASC, __chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, free, time, __chunk_order]

2025-05-29T10:19:36.375151Z DEBUG iox_query::exec::context: Running plan, physical:
ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, free@5 as free, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC, region@1 ASC, city@2 ASC, planet@3 ASC, time@6 ASC, __chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, free, time, __chunk_order]

2025-05-29T10:19:36.375173Z  INFO influxdb3_write::write_buffer::queryable_buffer: Persisting 4240000 rows for db id 1 and table id 0 and chunk 1748513400000000000 to file node-1/dbs/foo-1/cpu-0/2025-05-29/10-10/0000000107.parquet
2025-05-29T10:19:36.375340Z DEBUG iox_query::frontend::reorg: created compact plan for table table_name="cpu" table_id=0 plan=Sort: cpu.host ASC NULLS FIRST, cpu.region ASC NULLS FIRST, cpu.city ASC NULLS FIRST, cpu.planet ASC NULLS FIRST, cpu.time ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: cpu [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:19:36.375351Z DEBUG iox_query::exec::context: create_physical_plan: initial plan text=Sort: cpu.host ASC NULLS FIRST, cpu.region ASC NULLS FIRST, cpu.city ASC NULLS FIRST, cpu.planet ASC NULLS FIRST, cpu.time ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: cpu [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:19:36.375456Z DEBUG iox_query::provider::deduplicate: End building stream for DeduplicationExec::execute partition=0
2025-05-29T10:19:36.376098Z DEBUG iox_query::exec::context: create_physical_plan: plan to run text=ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, temp@5 as temp, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC, region@1 ASC, city@2 ASC, planet@3 ASC, time@6 ASC, __chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, temp, time, __chunk_order]

2025-05-29T10:19:36.376111Z DEBUG iox_query::exec::context: Running plan, physical:
ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, temp@5 as temp, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC, region@1 ASC, city@2 ASC, planet@3 ASC, time@6 ASC, __chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, temp, time, __chunk_order]

2025-05-29T10:19:36.463712Z DEBUG iox_query::provider::deduplicate: End building stream for DeduplicationExec::execute partition=0
2025-05-29T10:19:36.463954Z ERROR influxdb3_write::write_buffer::queryable_buffer: error during sort, deduplicate, and persist of buffer data as parquet error=failed to execute the sort and deduplication of chunked data from the buffer debug=failed to execute the sort and deduplication of chunked data from the buffer

Caused by:
    0: External error: Execution error for 'deduplicate batches'
       caused by
       Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool
    1: Execution error for 'deduplicate batches'
       caused by
       Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool
    2: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool
2025-05-29T10:19:36.464039Z ERROR panic_logging: Thread panic panic_type="unknown" panic_message="sort, deduplicate, and persist buffer data as parquet: failed to execute the sort and deduplication of chunked data from the buffer\n\nCaused by:\n    0: External error: Execution error for 'deduplicate batches'\n       caused by\n       Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool\n    1: Execution error for 'deduplicate batches'\n       caused by\n       Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool\n    2: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool" panic_file="influxdb3_write/src/write_buffer/queryable_buffer.rs" panic_line=284 panic_column=22

thread 'InfluxDB 3 Core Tokio IO 1' panicked at influxdb3_write/src/write_buffer/queryable_buffer.rs:284:22:
sort, deduplicate, and persist buffer data as parquet: failed to execute the sort and deduplication of chunked data from the buffer

Caused by:
    0: External error: Execution error for 'deduplicate batches'
       caused by
       Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool
    1: Execution error for 'deduplicate batches'
       caused by
       Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool
    2: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[0] consumed 0 bytes, ExternalSorterMerge[0] consumed 0 bytes. Error: Failed to allocate additional 369485056 bytes for ExternalSorter[0] with 0 bytes already allocated for this reservation - 200000000 bytes remain available for the total pool
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2025-05-29T10:19:36.464303Z ERROR panic_logging: Thread panic panic_type="unknown" panic_message="snapshot failed: RecvError(())" panic_file="/home/praveen/projects/influx/influxdb/influxdb3_write/src/write_buffer/mod.rs" panic_line=642 panic_column=64

thread 'InfluxDB 3 Core Tokio IO 1' panicked at /home/praveen/projects/influx/influxdb/influxdb3_write/src/write_buffer/mod.rs:642:64:
snapshot failed: RecvError(())
2025-05-29T10:19:36.962314Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=1 num_snapshots_after=900
2025-05-29T10:19:36.962335Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513976590365388 max_timestamp_ns=1748513976959904321 wal_file_number=108
2025-05-29T10:19:36.975847Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 108
2025-05-29T10:19:37.964348Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=2 num_snapshots_after=900
2025-05-29T10:19:37.964369Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513977592836757 max_timestamp_ns=1748513977961960643 wal_file_number=109
2025-05-29T10:19:37.977489Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 109
2025-05-29T10:19:38.970259Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=3 num_snapshots_after=900
2025-05-29T10:19:38.970293Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513978596833745 max_timestamp_ns=1748513978967851988 wal_file_number=110
2025-05-29T10:19:39.061068Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 110
2025-05-29T10:19:39.966342Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=4 num_snapshots_after=900
2025-05-29T10:19:39.966363Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513979599883318 max_timestamp_ns=1748513979963961390 wal_file_number=111
2025-05-29T10:19:40.058067Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 111
2025-05-29T10:19:40.967010Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=5 num_snapshots_after=900
2025-05-29T10:19:40.967032Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513980604132841 max_timestamp_ns=1748513980964648005 wal_file_number=112
2025-05-29T10:19:40.986782Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 112
2025-05-29T10:19:41.970465Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=6 num_snapshots_after=900
2025-05-29T10:19:41.970501Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513981607070287 max_timestamp_ns=1748513981968039666 wal_file_number=113
2025-05-29T10:19:42.060526Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 113
2025-05-29T10:19:42.962278Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=7 num_snapshots_after=900
2025-05-29T10:19:42.962299Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513982608893119 max_timestamp_ns=1748513982959898160 wal_file_number=114
2025-05-29T10:19:42.973385Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 114
2025-05-29T10:19:43.967065Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=8 num_snapshots_after=900
2025-05-29T10:19:43.967098Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513983611961524 max_timestamp_ns=1748513983964419540 wal_file_number=115
2025-05-29T10:19:44.054496Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 115
2025-05-29T10:19:44.963131Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=9 num_snapshots_after=900
2025-05-29T10:19:44.963152Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513984613875490 max_timestamp_ns=1748513984960756183 wal_file_number=116
2025-05-29T10:19:44.975679Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 116
2025-05-29T10:19:45.961384Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=10 num_snapshots_after=900
2025-05-29T10:19:45.961420Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513985617324206 max_timestamp_ns=1748513985958937716 wal_file_number=117
2025-05-29T10:19:45.975825Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 117
2025-05-29T10:19:46.962203Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=11 num_snapshots_after=900
2025-05-29T10:19:46.962234Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513986621686598 max_timestamp_ns=1748513986959758623 wal_file_number=118
2025-05-29T10:19:46.974045Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 118
2025-05-29T10:19:47.964179Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=12 num_snapshots_after=900
2025-05-29T10:19:47.964198Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513987624424379 max_timestamp_ns=1748513987961604057 wal_file_number=119
2025-05-29T10:19:47.976751Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 119
2025-05-29T10:19:48.971683Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=13 num_snapshots_after=900
2025-05-29T10:19:48.971718Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513988626647236 max_timestamp_ns=1748513988969188507 wal_file_number=120
2025-05-29T10:19:49.062055Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 120
2025-05-29T10:19:49.970994Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=14 num_snapshots_after=900
2025-05-29T10:19:49.971027Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513989630591454 max_timestamp_ns=1748513989968596506 wal_file_number=121
2025-05-29T10:19:50.059433Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 121
2025-05-29T10:19:50.964936Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=15 num_snapshots_after=900
2025-05-29T10:19:50.964970Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748513990633451821 max_timestamp_ns=1748513990962519702 wal_file_number=122
2025-05-29T10:19:50.976724Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 122
2025-05-29T10:19:51.953252Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=16 num_snapshots_after=900

Branch with separate write path executor

  • No OOMs observed at the same tput 5.42MB/s (snapshot runs and completes)
  • uses config 1G max memory, 500M for forcing snapshot, 200M for DF executor but also, write path executor has unbounded memory (which means it'll only die if the whole process runs out of memory)
  • actual command to run,
systemd-run --scope  -p MemoryMax=1000M -p CPUQuota=25% ./target/quick-release/influxdb3 serve --node-id node-1 --object-store file --data-dir /home/praveen/projects/influx/test-data/core-perf  --disable-telemetry-upload --snapshotted-wal-files-to-keep 10 --force-snapshot-mem-threshold 500 --exec-mem-pool-bytes 200 --log-filter 'info,iox_query=debug,influxdb3_server::query_executor=warn,influxdb3_server::http=warn,influxdb3_wal=debug,influxdb3_write::write_buffer::queryable_buffer=debug,influxdb3_write::write_buffer::table_buffer=debug,influxdb3::write_buffer=debug,influxdb3_enterprise=debug' --gen1-duration 10m --without-auth
  • logs from snapshot runs (snapshot ran twice, no OOM, by observing the cgroup's memory consumption)

2025-05-29T10:27:10.824466Z  INFO influxdb3_wal::object_store: snapshotting wal snapshot_details=SnapshotDetails { snapshot_sequence_number: SnapshotSequenceNumber(1), end_time_marker: 1748514600000000000, first_wal_sequence_number: WalFileSequenceNumber(1), last_wal_sequence_number: WalFileSequenceNumber(112), forced: true }
2025-05-29T10:27:10.824491Z  INFO influxdb3_write::write_buffer::queryable_buffer: Buffering contents and persisting snapshotted data snapshot_details=SnapshotDetails { snapshot_sequence_number: SnapshotSequenceNumber(1), end_time_marker: 1748514600000000000, first_wal_sequence_number: WalFileSequenceNumber(1), last_wal_sequence_number: WalFileSequenceNumber(112), forced: true }
2025-05-29T10:27:10.825168Z  INFO influxdb3_write::write_buffer::queryable_buffer: persisting 2 chunks for wal number 112
2025-05-29T10:27:10.825185Z  INFO influxdb3_write::write_buffer::queryable_buffer: Persisting 4400000 rows for db id 1 and table id 1 and chunk 1748514000000000000 to file node-1/dbs/foo-1/mem-1/2025-05-29/10-20/0000000112.parquet
2025-05-29T10:27:10.825840Z DEBUG iox_query::frontend::reorg: created compact plan for table table_name="mem" table_id=0 plan=Sort: mem.host ASC NULLS FIRST, mem.region ASC NULLS FIRST, mem.city ASC NULLS FIRST, mem.planet ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: mem [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:27:10.825885Z DEBUG iox_query::exec::context: create_physical_plan: initial plan text=Sort: mem.host ASC NULLS FIRST, mem.region ASC NULLS FIRST, mem.city ASC NULLS FIRST, mem.planet ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: mem [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, free:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:27:10.827179Z DEBUG iox_query::exec::context: create_physical_plan: plan to run text=ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, free@5 as free, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC,__chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, free, time, __chunk_order]

2025-05-29T10:27:10.827208Z DEBUG iox_query::exec::context: Running plan, physical:
ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, free@5 as free, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC,__chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, free, time, __chunk_order]

2025-05-29T10:27:10.827228Z  INFO influxdb3_write::write_buffer::queryable_buffer: Persisting 4400000 rows for db id 1 and table id 0 and chunk 1748514000000000000 to file node-1/dbs/foo-1/cpu-0/2025-05-29/10-20/0000000112.parquet
2025-05-29T10:27:10.827319Z DEBUG iox_query::frontend::reorg: created compact plan for table table_name="cpu" table_id=0 plan=Sort: cpu.host ASC NULLS FIRST, cpu.region ASC NULLS FIRST, cpu.city ASC NULLS FIRST, cpu.planet ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: cpu [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:27:10.827329Z DEBUG iox_query::exec::context: create_physical_plan: initial plan text=Sort: cpu.host ASC NULLS FIRST, cpu.region ASC NULLS FIRST, cpu.city ASC NULLS FIRST, cpu.planet ASC NULLS FIRST [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
  TableScan: cpu [host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, city:Dictionary(Int32, Utf8);N, planet:Dictionary(Int32, Utf8);N, usage:Float64;N, temp:Float64;N, time:Timestamp(Nanosecond, None)]
2025-05-29T10:27:10.827446Z DEBUG iox_query::provider::deduplicate: End building stream for DeduplicationExec::execute partition=0
2025-05-29T10:27:10.827936Z DEBUG iox_query::exec::context: create_physical_plan: plan to run text=ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, temp@5 as temp, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC,__chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, temp, time, __chunk_order]

2025-05-29T10:27:10.827945Z DEBUG iox_query::exec::context: Running plan, physical:
ProjectionExec: expr=[host@0 as host, region@1 as region, city@2 as city, planet@3 as planet, usage@4 as usage, temp@5 as temp, time@6 as time]
  DeduplicateExec: [host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC]
    SortExec: expr=[host@0 ASC,region@1 ASC,city@2 ASC,planet@3 ASC,time@6 ASC,__chunk_order@7 ASC], preserve_partitioning=[false]
      RecordBatchesExec: chunks=1, projection=[host, region, city, planet, usage, temp, time, __chunk_order]

2025-05-29T10:27:12.122803Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=1 num_snapshots_after=900
2025-05-29T10:27:12.122826Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748514431120942232 max_timestamp_ns=1748514432031246141 wal_file_number=113
2025-05-29T10:27:12.321525Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 113
2025-05-29T10:27:13.623704Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=2 num_snapshots_after=900
2025-05-29T10:27:13.623729Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748514432540162461 max_timestamp_ns=1748514433531234364 wal_file_number=114
2025-05-29T10:27:13.725831Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 114
2025-05-29T10:27:13.730209Z DEBUG iox_query::provider::deduplicate: before sending the left over batch
2025-05-29T10:27:13.825118Z DEBUG iox_query::provider::deduplicate: End building stream for DeduplicationExec::execute partition=0
2025-05-29T10:27:14.826412Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=3 num_snapshots_after=900
2025-05-29T10:27:14.826435Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748514433941005824 max_timestamp_ns=1748514434823038831 wal_file_number=115
2025-05-29T10:27:15.024568Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 115
2025-05-29T10:27:15.456144Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=4 num_snapshots_after=900
2025-05-29T10:27:15.456201Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748514434924952992 max_timestamp_ns=1748514435228744181 wal_file_number=116
2025-05-29T10:27:15.522671Z DEBUG influxdb3_wal::object_store: notify sent to buffer for wal file 116
2025-05-29T10:27:16.629075Z DEBUG influxdb3_wal::snapshot_tracker: wal periods and snapshots wal_periods_len=5 num_snapshots_after=900
2025-05-29T10:27:16.629096Z  INFO influxdb3_wal::object_store: flushing WAL buffer to object store host="node-1" n_ops=1 min_timestamp_ns=1748514435540997759 max_timestamp_ns=1748514436626375480 wal_file_number=117
2025-05-29T10:27:16.727636Z DEBUG iox_query::provider::deduplicate: before sending the left over batch
2025-05-29T10:27:16.821738Z DEBUG iox_query::provider::deduplicate: done sending the left over batch
2025-05-29T10:27:16.821883Z DEBUG iox_query::provider::deduplicate: done sending the left over batch
2025-05-29T10:27:16.825481Z  INFO influxdb3_write::write_buffer::queryable_buffer: Persisted parquet file: node-1/dbs/foo-1/mem-1/2025-05-29/10-20/0000000112.parquet
2025-05-29T10:27:16.834574Z  INFO influxdb3_write::write_buffer::queryable_buffer: Persisted parquet file: node-1/dbs/foo-1/cpu-0/2025-05-29/10-20/0000000112.parquet
  • systemd-cgtop system.slice/run-p3076443-i3076743.scope logs to see that it actually released the memory after each snapshot
// this is from 1st snapshot (961M -> 163.5M after successful snapshot)
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                            9   25.0   961.7M       0B    36.6M
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   24.8   414.6M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   20.4   168.8M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   12.1   163.5M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   15.1     175M        -        -
...

// this is from 2nd snapshot (997M -> 165.3M after successful snapshot)
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   24.9   912.9M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   24.8   997.8M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   24.8   418.8M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   23.7     164M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   14.9   165.3M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   15.7   188.8M        -        -
CGroup                                                                          Tasks   %CPU   Memory  Input/s Output/s
system.slice/run-p3076443-i3076743.scope                                           11   15.0   194.8M        -        -

@praveen-influx praveen-influx force-pushed the praveen/write-path-executor branch from 946233a to 73f96ce Compare June 2, 2025 08:59
Currently when there is an OOM while snapshotting, the process keeps
going without crashing. This behaviour is observed in main (commit:
be25c6f). This means the wal files keep
increasing to a point that restarts never can replay all the files.

This is happening because of the distribution of memory, in enterprise
especially there is no need for an ingester to be allocated just 20% for
datafusion memory pool (which runs the snapshot) as parquet cache is not
in use at all. This 20% is too conservative for an ingester, so instead
of redistributing the memory settings based on the mode it's running,
a separate write path executor is introduced in this commit with no
bound on memory (still uses `GreedyMemoryPool` under the hoold with
`usize::MAX` as upper limit). This means write path executor will always
run into OOM and stop the whole process.

Also, it is important to let snapshotting process use as much memory
as it needs as without that, the buffer will keep getting bigger and run
into OOM anyway.

closes: #26422
@praveen-influx praveen-influx force-pushed the praveen/write-path-executor branch from 73f96ce to ed5d40c Compare June 2, 2025 09:33
@praveen-influx praveen-influx marked this pull request as ready for review June 2, 2025 11:55
@praveen-influx praveen-influx requested a review from a team June 2, 2025 12:01
Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the results of splitting the executors are good. But, I am hesitant to move forward on this in light of the comment on the metrics registry.

What was the reason that we can't have them share a registry? Maybe we can leave this PR as is, and figure out how to recombine them in follow-on work.

My understanding is, in the current state, the "query" like metrics produced by the write path's executor would not be reported by the prometheus /metrics HTTP API.

That may not be that big of a deal, especially if we are reporting a tailored set of metrics for Core/Enterprise write path from our own code (e.g., we report ingest rate).

Comment on lines +548 to +551
// When you have extra executor, you need separate metrics registry! It is not clear what
// the impact would be
// TODO: confirm this is not going to mess up downstream metrics consumers
let write_path_metrics = setup_metric_registry();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely be an issue, since the HTTP endpoint that serves prometheus (/metrics) assumes a single registry:

fn handle_metrics(&self) -> Result<Response<Body>> {
let mut body: Vec<u8> = Default::default();
let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body);
self.common_state.metrics.report(&mut reporter);
Ok(Response::new(Body::from(body)))
}

Is the issue that using the same registry for multiple executors causes them to overwrite each other, or contend for locks with each other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs into panic,

2025-06-02T14:49:39.205230Z ERROR panic_logging: Thread panic panic_type="unknown" panic_message="More than one execution pool created: previously existing instrument" panic_file="/home/praveen/.cargo/git/checkouts/influxdb3_core-2ede6fca005e1dcf/fd0e474/iox_query/src/exec.rs" panic_line=281 panic_column=9

thread 'main' panicked at /home/praveen/.cargo/git/checkouts/influxdb3_core-2ede6fca005e1dcf/fd0e474/iox_query/src/exec.rs:281:9:
More than one execution pool created: previously existing instrument
stack backtrace:
   0:     0x629f44b5e172 - <std::sys::backtrace::BacktraceLock::print::DisplayBacktrace as core::fmt::Display>::fmt::hc04c8f544ab24d66
   1:     0x629f44b8eb63 - core::fmt::write::hfe57b7174b7d8eab
   2:     0x629f44b595a3 - std::io::Write::write_fmt::h154385efa8565236
   3:     0x629f44b5dfc2 - std::sys::backtrace::BacktraceLock::print::h0c8f24e22f5873a8
   4:     0x629f44b5f24c - std::panicking::default_hook::{{closure}}::hd07d57e6a602c8e4
   5:     0x629f44b5f04f - std::panicking::default_hook::h63d12f7d95bd91ed
   6:     0x629f3fd807db - panic_logging::SendPanicsToTracing::new_inner::{{closure}}::h4f1478e3035af477
   7:     0x629f44b5fd43 - std::panicking::rust_panic_with_hook::h33b18b24045abff4
   8:     0x629f44b5f9c6 - std::panicking::begin_panic_handler::{{closure}}::hf8313cc2fd0126bc
   9:     0x629f44b5e679 - std::sys::backtrace::__rust_end_short_backtrace::h57fe07c8aea5c98a
  10:     0x629f44b5f68d - __rustc[95feac21a9532783]::rust_begin_unwind
  11:     0x629f44b8bac0 - core::panicking::panic_fmt::hd54fb667be51beea
  12:     0x629f414b6cb7 - iox_query::exec::Executor::new_with_config_and_executor::h3ef1059edcb25ade
  13:     0x629f3f99fd9c - influxdb3::commands::serve::command::{{closure}}::h2cdf5ca9df83df25
  14:     0x629f3f9b6fcb - influxdb3::main::{{closure}}::hc953cfc298ca6770
  15:     0x629f3f987b39 - tokio::runtime::park::CachedParkThread::block_on::h51b18ac33f8a0e4d
  16:     0x629f3fb2a7bf - tokio::runtime::runtime::Runtime::block_on::h9eb33b87acb6fa53
  17:     0x629f3fc20d55 - influxdb3::main::h75a268e75e689bc6
  18:     0x629f3fcbb256 - std::sys::backtrace::__rust_begin_short_backtrace::h5b4e77177edb3cca
  19:     0x629f3fab4321 - std::rt::lang_start::{{closure}}::hc69eb1d94c6de306
  20:     0x629f44b4e080 - std::rt::lang_start_internal::h418648f91f5be3a1
  21:     0x629f3fc3b19d - main
  22:     0x7ed71c33d488 - <unknown>
  23:     0x7ed71c33d54c - __libc_start_main
  24:     0x629f3f95b325 - _start
  25:                0x0 - <unknown>
2025-06-02T14:49:39.295724Z  WARN executor: DedicatedExecutor dropped without calling shutdown()
2025-06-02T14:49:39.296308Z  WARN executor: DedicatedExecutor dropped without calling shutdown()

I can look into the panic and see if I can address that in a different way if this is going to mess downstream consumers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the code that runs into panic

https://github.com/influxdata/influxdb3_core/blob/fd0e474a6c0af5ba867399d753f5df18f59907cb/iox_query/src/exec.rs#L268-L284

It looks like there is an assumption that you violate single memory pool -> executor relationship if the "datafusion_pool" is already registered. Even though we don't break that relationship, i.e in this branch there are two executors and each has it's own memory pool so the relationship is still correct but because registry is shared it runs into this error.

I need to spend a bit more time to see if I can create the executor outside without hooking it up to metrics to start with (or use a different name for instrument "datafusion_write_pool") and then experiment with how it's reporting.

Comment on lines +649 to +655
// These are new additions, just skimming through the code it does not look like we can
// achieve the same effect as having a separate executor. It looks like it's for "all"
// queries, it'd be nice to have a filter to say when the query matches this pattern
// apply these limits. If that's possible maybe we could avoid creating a separate
// executor.
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, these came in from iox in the most recent sync. They aren't used anywhere in our code yet. I opened #26460 to address using them.

Copy link
Contributor

@hiltontj hiltontj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@praveen-influx and I discussed this.

The sentiment was that to enable the use of a single Registry across multiple Executors would require some upstream work in iox. The issue this change addresses is more important than the drawback of having an unused metric registry, so we can ship this and deal with the latter part in follow-on work.

We can open an issue in iox to summarize the issue and propose a change to support optional or namespaced metric registry on executor creation.

@praveen-influx
Copy link
Contributor Author

Thanks @hiltontj - I'll merge this in and create a separate issue to track how to expose this write path memory pool metrics.

@praveen-influx praveen-influx merged commit 1c8b428 into main Jun 2, 2025
16 checks passed
@praveen-influx
Copy link
Contributor Author

Created this #26484 issue to address the follow up work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Persist operations should have no memory limits
2 participants